#include "config.h"

#include <sys/types.h>
#include <sys/stat.h>
#include <sys/poll.h>
#include <sys/epoll.h>
#include <sys/file.h>
#include <sys/mman.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <stdint.h>
#include <sys/types.h>
#include <libgen.h>
#include <ctype.h>
#include <fcntl.h>
#include <libaio.h>
#include <sys/eventfd.h>
#include <errno.h>

#define DBG_SUBSYS S_LIBREPLICA

#include "disk.h"
#include "dbg.h"
#include "buffer_sgl.h"
#include "../schedule/spdk_extra.h"
#include "buffer.h"
#include "core.h"

#define SPDK_MAX_SGL_COUNT      3       //becarefull to modifty it, except you really unerstand it.

//uint64_t ctrl = 0;
//const diskloc_t *dloc = NULL;

static int spdk_disk_io_pread(const disk_t *disk, char *buf, size_t size,  off_t offset);
static int  spdk_disk_io_pwrite(const disk_t *disk, char *buf, size_t size,  off_t offset);

/*
inline static int __mem_init(int len, off_t _offset, mem_handler_t **_mem_handler, off_t *new_offset, size_t *new_len)
{
        uint32_t buflen = len;
        mem_handler_t *mem_handler = NULL;
        int ret;

        *new_offset = (_offset / PAGE_SIZE) * PAGE_SIZE;

        buflen += _offset - *new_offset;
        if (buflen % PAGE_SIZE) {
                buflen = (buflen / PAGE_SIZE) * PAGE_SIZE + PAGE_SIZE;
        }

        ret = mem_get_memhandler(&mem_handler);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        

        ret = mem_hugepage_align_new(buflen, mem_handler, PAGE_SIZE);
        if(unlikely(ret))
                GOTO(err_free, ret);

        *_mem_handler = mem_handler;
        *new_len = buflen;
        return 0;

err_free:
        mem_handler->in_use = 0;
err_ret:
        return ret;
}
*/

#if 0
inline static int __aio_read_sgl_init(buffer_t *buf, sgl_element_t *sgl, int *sgl_count, 
                off_t *new_offset, size_t *new_len, mem_handler_t **_mem_handler)
{
        int seg_count = 0, ret;
        struct list_head *pos;
        //size_t new_size = buf->len;
        //mem_handler_t *mem_handler;
        //off_t old_offset = *new_offset;
        seg_t *seg = (seg_t *)buf->list.next;

        list_for_each (pos, &buf->list) {
                seg_count++;
        }
        
        /*if (unlikely(seg_count > 1) || buf->len % 512 || old_offset % 4096 || (uint64_t)seg->handler.ptr % 4096 != 0) {

                DBUG("Performance degradation[aio_read_sgl_init]: seg_count: %d, size: %u, offset: %lu, ptr: %lu.\n",
                                seg_count, buf->len, old_offset, (uint64_t)seg->handler.ptr);
                ret =  __mem_init(buf->len,  old_offset,  &mem_handler, new_offset, &new_size);
                if(unlikely(ret))
                        GOTO(err_ret, ret);

                sgl->offset = 0;
                sgl->sgl_base = mem_handler->ptr;
                sgl->sgl_len = new_size;
                sgl->phyaddr = mem_handler->phyaddr;
                 
                DBUG("for bug test the io ptr %p, new size %lu, size %u new offset %lu, offset %lu\n", sgl->sgl_base, new_size, buf->len, *new_offset, old_offset);
                *sgl_count = 1;
                *_mem_handler = mem_handler;
                *new_len = new_size;

        } 
        else */{
                *new_len = buf->len;

                ret = mbuffer_sg_trans_sgl(sgl, sgl_count, seg);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}


inline static int __aio_write_sgl_init(const buffer_t *buf, sgl_element_t *sgl, int *sgl_count, 
                off_t *new_offset,  size_t *new_len, mem_handler_t **_mem_handler, void *fd)
{
        int seg_count = 0, ret;
        struct list_head *pos;
        //off_t old_offset = *new_offset;
        //size_t new_size = 0;
        //mem_handler_t *mem_handler;
        seg_t *seg = (seg_t *)buf->list.next;

        list_for_each (pos, &buf->list) {
                seg_count++;
        }
        
        /*if (unlikely(seg_count > 1) || buf->len % 512 || old_offset % 512 || (uint64_t)seg->handler.ptr % 4096 != 0) {
                DBUG("Performance degradation[aio_write_sgl_init]: seg_count: %d, size: %u, offset: %lu, ptr: %lu.\n",
                                seg_count, buf->len, old_offset, (uint64_t)seg->handler.ptr);

                ret =  __mem_init(buf->len, old_offset, &mem_handler, new_offset, &new_size);
                if(unlikely(ret))
                        GOTO(err_ret, ret);

                if (new_size != buf->len){
                        ret =  __spdk_disk_io_pread((void *)fd, mem_handler->ptr,  new_size, *new_offset);
                        if (unlikely(ret != (int)new_size))
                                GOTO(err_free_hugepage, ret);
                }

                YASSERT(old_offset >= *new_offset);
                mbuffer_get(buf, mem_handler->ptr + (old_offset - *new_offset), buf->len);
                
                sgl->offset = 0;
                sgl->sgl_base = mem_handler->ptr;
                sgl->sgl_len = new_size;
                sgl->phyaddr = mem_handler->phyaddr;
                 
                DBUG("for bug test the io ptr %p, new size %lu, size %u new offset %lu, offset %lu\n", sgl->sgl_base, new_size, buf->len, *new_offset, old_offset);
                *sgl_count = 1;
                *_mem_handler = mem_handler;
                *new_len = new_size;

        } else*/ {
                *new_len = buf->len;
                
                ret = mbuffer_sg_trans_sgl(sgl, sgl_count, seg);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;

err_free_hugepage:
        mem_hugepage_align_deref(mem_handler);
err_ret:
        return ret;
}

#endif

static int spdk_disk_aio_read(const disk_t *disk, const chkid_t *chkid, buffer_t *buf, off_t offset, int prio)
{
        int ret, sgl_count = SPDK_MAX_SGL_COUNT;
        //uint64_t offset;
        //off_t new_offset;
        sgl_element_t sgl[SPDK_MAX_SGL_COUNT];
        //size_t new_len;
        seg_t *seg = (seg_t *)buf->list.next;
        //mem_handler_t *mem_handler = NULL;
        (void )prio;
        (void) chkid;

        /*ret = disk_get_iofd(loc, (void *)&fd, &offset);
        if (unlikely(ret))
                GOTO(err_ret, ret);*/

        ret = mbuffer_sg_trans_sgl(sgl, &sgl_count, seg);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        /*new_offset = offset + _offset;
        ret =  __aio_read_sgl_init(buf, sgl, &sgl_count, &new_offset, &new_len, &mem_handler);
        if(unlikely(ret))
                GOTO(err_ret, ret);*/

        ret = spdk_aio_preadv(disk->disk_fd, sgl, sgl_count, buf->len, offset);
        if (unlikely(ret)) {
                //DERROR("disk[%u] IOError %s\n", loc->diskid, strerror(ret));
                YASSERT(0);
                //diskmd_remove(loc->diskid);
                EXIT(EAGAIN);
        }

        /*if (unlikely(mem_handler)) {
                mbuffer_put(buf, mem_handler->ptr + (offset + _offset - new_offset), buf->len);
                mem_hugepage_align_deref(mem_handler);
        }*/

        return 0;
err_ret:
        return ret;
}

static int spdk_disk_aio_write(const disk_t *disk, const chkid_t *chkid, const buffer_t *buf, off_t offset, int prio)
{
        int ret, sgl_count = SPDK_MAX_SGL_COUNT;
        //uint64_t offset;
        //off_t new_offset;
        sgl_element_t sgl[SPDK_MAX_SGL_COUNT];
        //size_t new_len = buf->len;
        ///mem_handler_t *mem_handler = NULL;
        seg_t *seg = (seg_t *)buf->list.next;

        (void)prio;
        (void) chkid;
        /*ret = disk_get_iofd(loc, (void *)&fd, &offset);
        if (unlikely(ret))
                GOTO(err_ret, ret);*/

        /*new_offset = offset + _offset;
        ret = __aio_write_sgl_init(buf, sgl, &sgl_count, &new_offset, &new_len, &mem_handler, disk->disk_fd);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }
        */

        ret = mbuffer_sg_trans_sgl(sgl, &sgl_count, seg);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = spdk_aio_pwritev(disk->disk_fd, sgl, sgl_count, buf->len, offset);
        if (unlikely(ret)) {
                //DERROR("disk[%u] IOError %s\n", loc->diskid, strerror(ret));
                YASSERT(0);
                //diskmd_remove(loc->diskid);
                EXIT(EAGAIN);
        }

       // if (unlikely(mem_handler))
        //        mem_hugepage_align_deref(mem_handler);

        return 0;

err_ret:
        return ret;
}

#if 0
static int spdk_disk_io_pwritev(const disk_t * disk, const  buffer_t *buf, off_t _offset)
{
        int ret;
        int sgl_count = SPDK_MAX_SGL_COUNT;
        sgl_element_t sgl[SPDK_MAX_SGL_COUNT];
        seg_t *seg = (seg_t *)buf->list.next;

        DBUG("spdk iov write _offset %lu, len %u \n", _offset, buf->len);

        ret = mbuffer_sg_trans_sgl(sgl, &sgl_count, seg);
        if (unlikely(ret))
                GOTO(err_ret, ret);


        if (core_self()) {
                struct non_aligned_io_t *io;
                ret = ymalloc((void **)&io, sizeof(*io));
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                io->fd = disk->disk_fd;
                io->size = buf->len;
                io->offset = _offset;
                io->sgl = sgl;
                io->sgl_count = sgl_count;
                io->type = PWRITEV;
                io->schedule_task = schedule_task_get();

                ret = sy_spin_lock(&handle_io.lock);
                if (unlikely(ret)) {
                        YASSERT(0);
                }

                list_add_tail(&io->hook, &handle_io.io_list);
                sy_spin_unlock(&handle_io.lock);

                sem_post(&handle_io.sem);
                DBUG(" 3. sem_post: spdk_disk_io_pwritev[%d], io: %p \n", io->type, io);

                ret = schedule_yield("spdk_disk_io_pwritev", NULL, NULL);
                if (unlikely(ret)) {
                        DWARN("spdk_disk_io_pwritev task retval %u\n", ret);
                        yfree((void **)&io);
                        GOTO(err_free, ret);
                }

                yfree((void **)&io);

        } else {
                ret = spdk_pwritev(disk->disk_fd,  sgl, sgl_count, buf->len,  _offset); 
                if (unlikely(ret)) {
                        GOTO(err_free, ret);
                }
        }
        
        return 0;

err_free:
err_ret:
        YASSERT(0);
        return ret;
}
static int spdk_disk_io_preadv(const disk_t * disk, buffer_t *buf, off_t  _offset)
{
        int ret;
        int aligned = 0;
        sgl_element_t sgl;
        
        DBUG("spdk iov read _offset %lu,  len %u \n", _offset,  buf->len);
        
        if(likely(mbuffer_is_aligned(buffer, PAGE_SIZE) && (_offset % PAGE_SIZE) == 0)) {
                sgl.sgl_base = mem_handler->ptr;
                aligned = 1;
        }  
        else {
                ret = ymalloc(&sgl.sgl_base, buf->len);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }
        
        sgl.offset = 0;
        sgl.sgl_len = buf->len;
        sgl.phyaddr = 0;

        //YASSERT(_offset >= new_offset);
        if (core_self()) {
                struct non_aligned_io_t *io;
                ret = ymalloc((void **)&io, sizeof(*io));
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                io->fd = (void *)disk->disk_fd;
                io->size = buf->len;
                io->offset = _offset;
                io->sgl = &sgl;
                io->sgl_count = 1;
                io->type = PREADV;
                io->schedule_task = schedule_task_get();

                ret = sy_spin_lock(&handle_io.lock);
                if (unlikely(ret)) {
                        YASSERT(0);
                }

                list_add_tail(&io->hook, &handle_io.io_list);
                sy_spin_unlock(&handle_io.lock);

                sem_post(&handle_io.sem);
                DBUG("3. sem_post: spdk_disk_io_preadv[%d], io: %p \n", io->type, io);

                ret = schedule_yield("spdk_disk_io_preadv", NULL, NULL);
                if (unlikely(ret)) {
                        DWARN("spdk_disk_io_preadv task retval %u\n", ret);
                        GOTO(err_ret, ret);
                }

                yfree((void **)&io);

        } else {
                ret = spdk_preadv(disk->disk_fd, &sgl, 1,  buf->len, _offset); 
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        }

        if(unlikely(!aligned)) {
                mbuffer_appendmem(buf, sgl.sgl_base, buf->len);
                yfree(&sgl.sgl_base);
        }
        
        //mem_hugepage_align_deref(mem_handler);
        return 0;
err_ret:
       //mem_hugepage_align_deref(mem_handler);
        if(unlikely(!aligned))
                yfree(&sgl.sgl_base);

        return ret;
}
#endif
static int spdk_disk_io_pread(const disk_t *disk, char *buf, size_t size,  off_t offset)
{
        int ret;
        //uint64_t offset;
         sgl_element_t sgl;
        
   /*     ret = disk_getfd(loc, &fd, &offset);
        if (ret)
                GOTO(err_ret, ret);*/
        
        DBUG("spdk io pread size %lu, offset %lu\n", size, offset);
        YASSERT(offset % 512  == 0);

        sgl.offset = 0;
        sgl.sgl_base = buf;
        sgl.sgl_len = size;
        sgl.phyaddr = 0;
        
        if (core_self()) {
                struct non_aligned_io_t *io;
                ret = ymalloc((void **)&io, sizeof(*io));
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                io->fd = disk->disk_fd;
                io->size = size;
                io->offset = offset;
                io->sgl = &sgl;
                io->sgl_count = 1;
                io->type = PREAD;
                io->schedule_task = schedule_task_get();
                ret = sy_spin_lock(&handle_io.lock);
                if (unlikely(ret)) {
                        YASSERT(0);
                }

                list_add_tail(&io->hook, &handle_io.io_list);

                sy_spin_unlock(&handle_io.lock);

                sem_post(&handle_io.sem);
                DBUG(" 3. sem_post: spdk_disk_io_pread[%d], io: %p \n", io->type, io);

                ret = schedule_yield("spdk_disk_io_pread", NULL, NULL);
                if (unlikely(ret <= 0)) {
                        DWARN("task retval %u\n", -ret);
                        //        diskmd_remove(loc->diskid);
                        EXIT(EAGAIN);
                }

                yfree((void **)&io);

        } else {
                ret = spdk_pread(disk->disk_fd, &sgl,  offset); 
                if (unlikely(ret <= 0)) {
                        DWARN("task retval %u\n", -ret);
                        EXIT(EAGAIN);
                }
        }

        return ret;
}

static int  spdk_disk_io_pwrite(const disk_t *disk, char *buf, size_t size,  off_t offset)
{
        int ret;
        sgl_element_t sgl;

       /* ret = disk_getfd(loc, &fd, &offset);
        if (ret)
                GOTO(err_ret, ret);*/

        DBUG("spdk io pwrite size %lu, offset %lu\n", size, offset);
        YASSERT(offset % 512 == 0);
        sgl.offset = 0;
        sgl.sgl_base = buf;
        sgl.sgl_len = size;
        sgl.phyaddr = 0;

        if (core_self()) {
                struct non_aligned_io_t *io;
                ret = ymalloc((void **)&io, sizeof(*io));
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                io->fd = disk->disk_fd;
                io->size = size;
                io->offset = offset;
                io->sgl = &sgl;
                io->sgl_count = 1;
                io->type = PWRITE;
                io->schedule_task = schedule_task_get();
                ret = sy_spin_lock(&handle_io.lock);
                if (unlikely(ret)) {
                        YASSERT(0);
                }

                list_add_tail(&io->hook, &handle_io.io_list);

                sy_spin_unlock(&handle_io.lock);

                sem_post(&handle_io.sem);
                DBUG(" 3. sem_post: spdk_disk_io_pwrite[%d], io: %p \n", io->type, io);

                ret = schedule_yield("spdk_disk_io_pwrite", NULL, NULL);
                if (unlikely(ret <= 0)) {
                        DWARN("task retval %u\n", -ret);
                        //        diskmd_remove(loc->diskid);
                        EXIT(EAGAIN);
                }

                yfree((void **)&io);

        } else {
                ret = spdk_pwrite(disk->disk_fd, &sgl, offset); 
                if (unlikely(ret <= 0)) {
                        DWARN("task retval %d\n",  -ret);
                        YASSERT(0);
                        EXIT(EAGAIN);
                }
        }

        return ret;
}

static int spdk_disk_create_new(disk_t *disk, const char *home, const char *pool)
{
        int ret;
        diskinfo_t diskinfo;
        char path[MAX_PATH_LEN];

        ret = disk_setinfo(home, pool, disk, &diskinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        snprintf(path, MAX_PATH_LEN, "%s/info/%d.info", home, disk->idx);

        ret = _set_value(path, (void *)&diskinfo, sizeof(diskinfo), O_CREAT | O_TRUNC);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = bmap_set(&disk->bmap, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = fsync(disk->map_fd);
        if (ret)
                UNIMPLEMENTED(__DUMP__);

        return 0;
err_ret:
        return ret;
}


static int spdk_disk_probe_check(disk_t *disk, const char *home, const char *pool)
{
        int ret;
        diskinfo_t diskinfo, _diskinfo;
        char path[MAX_PATH_LEN], uuid[MAX_NAME_LEN], _uuid[MAX_NAME_LEN];

        if (disk->status & __DISK_OFFLINE__) {
                DWARN("disk[%u] offline\n", disk->idx);
                return 0;
        }

        if (bmap_get(&disk->bmap, 0) == 0) {
                DINFO("new disk[%u], need init\n", disk->idx);

                ret = disk->dop->create_new(disk, home, pool);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

        } else {
                ret = disk_getinfo(home, disk, &diskinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                snprintf(path, MAX_PATH_LEN, "%s/info/%d.info", home, disk->idx);

                ret = _get_value(path, (void *)&_diskinfo, sizeof(_diskinfo));
                if (ret < 0) {
                        ret = -ret;
                        GOTO(err_ret, ret);
                }

                if (memcmp(&diskinfo, &_diskinfo, sizeof(diskinfo))) {
                        uuid_unparse(diskinfo.diskid, uuid);
                        uuid_unparse(_diskinfo.diskid, _uuid);
                        DWARN("bad disk, id %s : %s, cluster %s : %s, idx %u : %u\n",
                                        uuid, _uuid, diskinfo.cluster, _diskinfo.cluster,
                                        diskinfo.idx, _diskinfo.idx);
                        EXIT(EIO);
                }
        }

        return 0;
err_ret:
        return ret;
}

#if 0
static int __spdk_diskmd_setsuperblock(const disk_t *disk, diskinfo_t *diskinfo)
{
        int ret, i;
        struct ext4_super_block super_block;
        char hostname[MAX_NAME_LEN];
        void *fd;

        ret = net_gethostname(hostname, MAX_NAME_LEN);
        if (ret) {
                if (ret == ECONNREFUSED)
                        strcpy(hostname, "N/A");
                else
                        GOTO(err_ret, ret);
        }

        memset(&super_block, 0, sizeof(super_block));

        super_block.s_vnodes_count = 3276800;
        super_block.s_blocks_count_lo = 13107200;
        super_block.s_blocks_per_group = 32768;
        super_block.s_vnodes_per_group = 8192;
        super_block.s_magic = 0xef53;
        super_block.s_state = 1; /* not requisite */
        super_block.s_errors = 1; /* not requisite */

        for (i = 0; i < 16; i++) {
                super_block.s_uuid[i] = diskinfo->diskid[i];
        }

        sprintf(super_block.s_volume_name, "lich-disk-disk%d", disk->idx);
        sprintf((char *)super_block.s_reserved, "cluster=%s;node=%s;type=data;disk=%d;", gloconf.uuid, hostname, disk->idx);
        
        fd = disk->u.spdk_fd;
        ret = disk->dop->io_pwrite(fd,(char *)&super_block, sizeof(super_block), SUPER_BLOCK_OFFSET);

        if (unlikely(ret != sizeof(super_block))) {
                ret = EIO;
                GOTO(err_ret, ret);
        }
        
        return 0;
err_ret:
        return ret;
}

static int spdk_diskmd_setdskinfo(const disk_t *disk, diskinfo_t *diskinfo)
{
        int ret;
        void *fd;

        diskinfo->idx = disk->idx;
        diskinfo->nid = *net_getnid();
        uuid_generate(diskinfo->diskid);
        memset(diskinfo->cluster, 0x0, MAX_NAME_LEN);
        strcpy(diskinfo->cluster, gloconf.uuid);

        diskinfo->crc = crc32_sum((void *)diskinfo + sizeof(uint32_t), sizeof(*diskinfo) - sizeof(uint32_t));
        DINFO("spdk set info uuid \n");
        
        fd = disk->u.spdk_fd;
        ret = disk->dop->io_pwrite(fd, (char *)diskinfo, sizeof(*diskinfo), DISKINFO_OFFSET);
    
        if (unlikely(ret != sizeof(*diskinfo))) {
                DWARN("the disk type is %d, return %d need %lu\n", disk->disk_type,ret, sizeof(*diskinfo));
                ret = EIO;
                GOTO(err_ret, ret);
        }

        ret = __spdk_diskmd_getinfo(NULL, disk, diskinfo);
        
        return 0;
err_ret:
        return ret;
}

static int spdk_diskmd_getinfo(const diskmd_t *diskmd, const disk_t *disk,
                            diskinfo_t *diskinfo)
{
        int ret;
        uint32_t crc;
        char path[MAX_PATH_LEN];
        void *fd;
        
        fd = disk->u.spdk_fd;
        ret = disk->dop->io_pread(fd, (char *)diskinfo, sizeof(*diskinfo), DISKINFO_OFFSET);
        if (unlikely(ret != sizeof(*diskinfo))) {
                DWARN("read disk[%u], ret %u %s\n", disk->idx, ret, strerror(ret));
                sprintf(path, "%s/disk/%d.disk", diskmd->home, disk->idx);
                unlink(path);
                EXIT(EAGAIN);
        }

        DINFO("spdk get info ok, len %lu buf addr is %p\n", sizeof(*diskinfo), diskinfo);

        crc = crc32_sum((void *)diskinfo + sizeof(uint32_t), sizeof(*diskinfo) - sizeof(uint32_t));
        if (unlikely(crc != diskinfo->crc)) {
                DWARN("read disk[%u], crc %x %x\n", disk->idx, crc, diskinfo->crc);
                EXIT(EIO);
        }

        return 0;
}
#endif

/*
static int __spdk_diskmd_destroy(disk_t *disk)
{
        int ret;
        struct ext4_super_block super_block;
        void *fd;

        memset(&super_block, 0x0, sizeof(struct ext4_super_block));        

        fd = disk->u.spdk_fd;
        ret = disk->dop->io_pwrite(fd, (char *)&super_block, sizeof(super_block), SUPER_BLOCK_OFFSET);
        if(unlikely(ret != sizeof(super_block))){
                ret = EIO;
                GOTO(err_ret, ret);
        }

        return 0;

err_ret:
        return ret;
}

*/

/* pci_pool_disk_domain.bus.dev.func */
static int spdk_disk_load_disk(disk_t *disk, const char *home,  char *pool, uint64_t *disk_size)
{
        void *fd;
        int ret, count;
        struct stat stbuf;
        int bus,domain, device, func;
        char path[PATH_MAX], *ptr, *dev[4], *tmp[4],
                *pool_name, *disk_name, *picbus;

        YASSERT(disk->idx < DISK_MAX);

        sprintf(path, "%s/disk/%d.disk", home, disk->idx);

        ret = diskmd_real_path(path, &stbuf);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        DINFO("initior the spdk device  %s\n", path);
        ptr = strstr(path, "pci");
        if(unlikely(ptr == NULL)){
                ret = ENODEV;
                GOTO(err_ret, ret);
        }

        count = 4;
        _str_split(path, '_', dev, &count);

        if (count != 4) {
                DINFO("disk spdk arg fail %d\n", count);
                return EINVAL;
        }

        pool_name = dev[1];
        disk_name = dev[2];
        picbus = dev[3];

        count = 4;
        _str_split(picbus, '.', tmp, &count);
        if (count != 4) {
                DINFO("disk spdk arg fail %d\n", count);
                return EINVAL;
        }

        domain = strtol(tmp[0], NULL, 16);
        bus = strtol(tmp[1], NULL, 16);
        device = strtol(tmp[2], NULL, 16);
        func = strtol(tmp[3], NULL, 16);

        fd = lookup_spdk_nvme_controller(domain, bus, device, func);
        if(unlikely(fd == NULL)){
                ret = ENODEV;
                GOTO(err_ret, ret);
        }

        disk->disk_fd = fd;
        disk->disk_type = __DISK_TYPE_SPDK_DISK__;

        strcpy(pool, pool_name);
        spdk_nvme_disk_size(fd, disk_size);

        DINFO("load disk[%u] %s tier %d size %ju.\n", disk->idx, disk_name, disk->tier, *disk_size);

        return 0;
err_ret:
        return ret;
}

static void spdk_disk_unload(disk_t *disk)
{
        return ;
}

static void spdk_disk_offline(disk_t *disk)
{
        disk->status |= __DISK_OFFLINE__;
        disk->disk_fd = NULL;

        yfree((void**)&disk);
}

int spdk_disk_writeable(disk_t *disk)
{
        (void)disk;

        return 1;
}

/*
static int __spdk_diskmd_writeable(disk_t *disk, char *path)
{
        uint32_t ret;
        (void)*path;
        if(unlikely(disk->dop == NULL))
                return ENODEV;

        ret = disk->dop->io_pwrite(disk->u.spdk_fd, gloconf.uuid, strlen(gloconf.uuid), DISK_WRITEABLE_OFFSET);
        if(likely(ret == strlen(gloconf.uuid)))
                return 0;
        
        return EIO;
}


static int __spdk_diskmd_get_fd(const disk_t *disk, void *fd)
{       
        if (unlikely(disk->u.spdk_fd == NULL)) {
                return ENODEV;
        }
        
        if (unlikely(disk->disk_type != __SPDK_DISK__)) {
                YASSERT(0);
        }
        
        *((uint64_t *)fd) = (uint64_t)disk->u.spdk_fd;

        if (unlikely(fd == NULL)) {
                DERROR("it is not right fd\n");
        }
        return 0;
}

static int __spdk_diskmd_get_iofd(const disk_t *disk, void *fd)
{
        return __spdk_diskmd_get_fd(disk, fd);
}*/

static int disk_spdk_connect(const disk_t *disk, disk_t *newdisk)
{
        core_t *core = core_self();

        DINFO("open disk[%d] core[%u]\n", disk->idx, core->hash);

        newdisk->disk_fd = disk->disk_fd;
        newdisk->disk_base_offset = disk->disk_base_offset;
        newdisk->idx = disk->idx;
        newdisk->dop = disk->dop;

        return 0;
}

static void disk_spdk_disconnect(disk_t *disk)
{
        core_t *core = core_self();

        DINFO("close disk[%d] core[%u]\n", disk->idx, core->hash);
}

static struct disk_op_t __spdk_disk_dop__ = {
        .io_pread = spdk_disk_io_pread,
        .io_pwrite = spdk_disk_io_pwrite,
        //.io_preadv = spdk_disk_io_preadv,
        //.io_pwritev = spdk_disk_io_pwritev,
        .aio_readv = spdk_disk_aio_read,
        .aio_writev = spdk_disk_aio_write,

        .writeable = spdk_disk_writeable,
        .offline = spdk_disk_offline,

        .open = spdk_disk_load_disk,
        .create_new = spdk_disk_create_new,
        .probe_check = spdk_disk_probe_check,
        .close = spdk_disk_unload,
        .connect = disk_spdk_connect,
        .disconnect = disk_spdk_disconnect,
};

struct disk_op_t * get_spdk_disk_ops()
{
     return  &__spdk_disk_dop__; 
}

//static int __spdk_disk_ops_init__ = 0;


#if 0

void * spdk_io_test(void *arg)
{
        (void)arg;
        buffer_t buf;
        char msg[4096];
        seg_t *seg;
        
        mbuffer_init(&buf, 4096);
        seg = (seg_t *)buf.list.next;
        memset(seg->handler.ptr, 0x11, 4096);
        memset(msg, 0x11, 4096);

        while (1) {
                if(dloc){
                        sleep(1);
                        DWARN("========io test begin====  %p \n", seg->handler.ptr);
                        if(memcmp(seg->handler.ptr, msg, 4096))
                                DWARN("iov test failed\n");
                        memset(seg->handler.ptr, 0x11, 4096);

                        if(memcmp(seg->handler.ptr, msg, 4096))
                                YASSERT(0);
                        DWARN("begin pwrite test\n");
                        memset(seg->handler.ptr, 0x00, 4096);
                        if(memcmp(seg->handler.ptr, msg, 4096))
                                YASSERT(0);
                }
                
                sleep(1);
        }      
}


void register_spdk_disk_op(struct disk_op_t **dop)
{
        *dop = &__spdk_disk_dop__;
        if (unlikely(*dop == NULL))
                YASSERT(0);
        
        if(__spdk_disk_ops_init__)
                return;
        
        register_disktype_ops(__SPDK_DISK__, &__spdk_disk_dop__);
        
        if (gloconf.spdk_test) {
                int ret;
                pthread_t th;
                pthread_attr_t ta;

                pthread_attr_init(&ta);
                //  pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);

                ret = pthread_create(&th, &ta, spdk_io_test, NULL);
                if(unlikely(ret))
                        YASSERT(0);
        }
        __spdk_disk_ops_init__ = 1;

}

#endif
